package com.hanxiaozhang.example.listener.mgsconsumer;

import com.hanxiaozhang.constant.RocketConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.common.message.MessageQueue;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;


/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/10/1
 * @since 1.0.0
 */
@Slf4j
@Component
public class No5PullMgsOriginalSyntax {



    /*
    获取某个Topic的所有队列：
    Set<MessageQueue> queueSet =  consumer.fetchSubscribeMessageQueues("TopicTest");

    PullStatus：
    FOUND表示拉取到消息
    NO_NEW_MSG表示没有发现新消息
    NO_MATCHED_MSG表示没有匹配的消息
    OFFSET_ILLEGAL表示传入的拉取位点是非法的，有可能偏大或偏小。
     */


    public void pull(long offset, int maxNums) {

        DefaultMQPullConsumer consumer = null;
        try {
            // 1. 初始化DefaultMQPullConsumer并启动
            consumer = new DefaultMQPullConsumer(RocketConstant.PULL_ORIGINAL_CONSUMER_GROUP);
            consumer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
            consumer.start();
            // 2. 构造需要拉取的队列MessageQueue
            MessageQueue mq = new MessageQueue();
            mq.setQueueId(1);
            mq.setTopic(RocketConstant.PULL_ORIGINAL_TOPIC);
            // brokerName在broker.conf 配置
            mq.setBrokerName("broker-a");
            // 3. 拉消息，不阻塞
            PullResult pullResult = consumer.pull(mq, "*", offset, maxNums);
            log.info("{}消息状态：{}", this.getClass().getSimpleName(),  pullResult.getPullStatus());
            // 4. 如果拉取状态是FOUND，通过pullResult.getMsgFoundList()获取拉取到的消息列表。
            if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
                pullResult.getMsgFoundList().forEach(x->{
                    try {
                        log.info("{}收到消息：{}", this.getClass().getSimpleName(),  new String(x.getBody(), "utf-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                });
                // 5. 完成消费，更新偏移量
                consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (consumer != null) {
                consumer.shutdown();
            }
        }
    }

}
